package org.marketcetera.marketdata;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.marketcetera.core.IFeedComponentListener;
import org.marketcetera.core.InMemoryIDFactory;
import org.marketcetera.core.InternalID;
import org.marketcetera.core.NoMoreIDsException;
import org.marketcetera.core.publisher.ISubscriber;
import org.marketcetera.core.publisher.PublisherEngine;
import org.marketcetera.event.AggregateEvent;
import org.marketcetera.event.Event;
import org.marketcetera.event.EventTranslator;
import org.marketcetera.marketdata.MarketDataFeedToken.Status;
import org.marketcetera.metrics.ConditionsFactory;
import org.marketcetera.metrics.ThreadedMetric;
import org.marketcetera.util.log.I18NBoundMessage1P;
import org.marketcetera.util.log.SLF4JLoggerProxy;
import org.marketcetera.util.misc.ClassVersion;
import org.marketcetera.util.misc.NamedThreadFactory;

/* $License$ */

/**
 * An abstract base class for all market data feeds. 
 * 
 * <p>Contains logic common to all market data feed implementations with 
 * the mechanics of adding/removing symbol/feed component listeners and keeping track of the feed 
 * status.
 * 
 * <p>Subclasses represent a connection to a specific market data feed.
 * 
 * <p>The generic types required are defined as follows:
 * <ul>
 *   <li>T - The token class for this feed</li>
 *   <li>C - The credentials class for this feed</li>
 *   <li>X - The message translator class for this feed</li>
 *   <li>E - The event translator class for this feed</li>
 *   <li>D - The type returned from {@link DataRequestTranslator#fromDataRequest(MarketDataRequest)}.</li>
 *   <li>F - The market data feed type itself</li>
 * </ul>
 *
 * @author andrei@lissovski.org
 * @author gmiller
 * @author <a href="mailto:colin@marketcetera.com">Colin DuPlantis</a>
 * @since 0.5.0
 */
@ClassVersion("$Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $") //$NON-NLS-1$
public abstract class AbstractMarketDataFeed<T extends AbstractMarketDataFeedToken<F>, 
                                             C extends MarketDataFeedCredentials, 
                                             X extends DataRequestTranslator<D>, 
                                             E extends EventTranslator,
                                             D,
                                             F extends AbstractMarketDataFeed<?,?,?,?,?,?>> 
    implements MarketDataFeed<T,C> 
{
    /**
     * the id factory used to generate unique IDs within the context of all feeds for this JVM session
     */
    private static final InMemoryIDFactory sIDFactory = new InMemoryIDFactory(0,
                                                                              Long.toString(System.currentTimeMillis()));
    /**
     * the singleton instance that aggregates the actual connections to the server 
     */
    private static ExecutorService sPool = Executors.newCachedThreadPool(new NamedThreadFactory("AbsMarketDataFeed-"));  //$NON-NLS-1$
    /**
     * the status of the feed
     */
    private FeedStatus mFeedStatus = FeedStatus.UNKNOWN;
    /**
     * the type of the feed
     */
    private final FeedType mFeedType;
    /**
     * the feed ID
     */
    private final InternalID mID;
    /**
     * helper object to track the tokens and associated handles from requests and responses
     */
    private final HandleHolder mHandleHolder = new HandleHolder();
    /**
     * provider name associated with the data feed
     */
    private final String mProviderName;
    /**
     * publish/subscribe engine to manage feed status changes
     */
    private final PublisherEngine mFeedStatusPublisher = new PublisherEngine();
    /**
     * detailed messages generated by the data feeds
     */
    public final static String DATAFEED_TRACE_MESSAGES = "datafeed.trace"; //$NON-NLS-1$
    /**
     * status messages generated by the data feeds
     */
    public final static String DATAFEED_STATUS_MESSAGES = "datafeed.status"; //$NON-NLS-1$
    /**
     * common key used to create a well-known location for storing an indicator to tell a feed whether to simulate market data or
     * try for the real thing.  the intended use is to set this property in the System environment (accessible via System.getProperty),
     * but its use is not strictly limited to that by design.
     */
    public final static String MARKETDATA_SIMULATION_KEY = "allowMarketDataSimulation"; //$NON-NLS-1$
    /**
     * Condition used to determine if performance instrumentation should be
     * recording sampled instrumentation data.
     */
    private static final Callable<Boolean> PUBLISHING_CONDITION = ConditionsFactory.createSamplingCondition(100,
                                                                                                            "metc.metrics.marketdata.sampling.interval");  //$NON-NLS-1$
    /**
     * Indicates if the feed is allowed to simulate market data if the normal source is not
     * available.
     *
     * @return a <code>boolean</code> value
     */
    private boolean isSimulatedDataAllowed()
    {
        return Boolean.getBoolean(MARKETDATA_SIMULATION_KEY);
    }
    /**
     * Gets the next ID in sequence for assigning unique identifiers to market data feed objects.
     *
     * @return an <code>InternalID</code> value
     * @throws NoMoreIDsException if no more IDs are available
     */
    private static InternalID getNextID() 
        throws NoMoreIDsException
    {
        return new InternalID(sIDFactory.getNext());
    }
    /**
     * Create a new <code>AbstractMarketDataFeed</code> instance.
     *
     * @param inFeedType a <code>FeedType</code> value
     * @param inProviderName a <code>String</code> value
     * @throws NoMoreIDsException if no more ids are available to be assigned to this feed
     * @throws NullPointerException if the <code>FeedType</code> is null
     */
    protected AbstractMarketDataFeed(FeedType inFeedType,
                                     String inProviderName) 
        throws NoMoreIDsException
    {
        if(inFeedType == null) {
            throw new NullPointerException();
        }
        if(inProviderName == null) {
            throw new NullPointerException();
        }
        mFeedType = inFeedType;
        mID = getNextID();
        mProviderName = inProviderName;
        mFeedStatus = FeedStatus.OFFLINE;
        if(isSimulatedDataAllowed()) {
            SLF4JLoggerProxy.debug(this,
                                   "simulated data is allowed"); //$NON-NLS-1$
        } else {
            SLF4JLoggerProxy.debug(this,
                                   "simulated data is not allowed"); //$NON-NLS-1$
        }
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.IMarketDataFeed#execute(org.marketcetera.marketdata.AbstractMarketDataFeedCredentials, DataRequest, org.marketcetera.core.publisher.ISubscriber)
     */
    @Override
    public final T execute(MarketDataFeedTokenSpec inTokenSpec)
            throws FeedException
    {
        // check the status of the feed - if it's not running, the feed isn't
        //  ready to be used
        if(!getFeedStatus().isRunning()) {
            throw new FeedException(new I18NBoundMessage1P(Messages.MARKET_DATA_FEED_CANNOT_EXEC_REQUESTS, getFeedStatus()));
        }
        // these subscribers are all the ones that are interested in the results
        //  of the query we're about to execute - this list may be empty or null
        ISubscriber[] subscribers = inTokenSpec.getSubscribers();
        // the token is used to track the request and its responses
        // generate a new token for this request
        T token;
        try {
            token = generateToken(inTokenSpec);
        } catch (Exception e) {
            throw new FeedException(e,
                                    Messages.ERROR_MARKET_DATA_FEED_EXECUTION_FAILED);
        }
        // it's possible that some messages won't need subscribers, perhaps if the caller doesn't care
        //  about responses.  if the subscriber is null, ignore it.  otherwise, set the token to receive
        //  the responses        
        if(subscribers != null) {
            token.subscribeAll(subscribers);
        }
        try {
            return submitToken(token);
        } catch (TimeoutException e) {
            throw new FeedException(e,
                                    Messages.ERROR_MARKET_DATA_FEED_EXECUTION_FAILED);
        }
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.MarketDataFeed#login(org.marketcetera.marketdata.MarketDataFeedCredentials)
     */
    @Override
    public final boolean login(C inCredentials)
    {
        SLF4JLoggerProxy.debug(this,
                               "Logging in to {}", //$NON-NLS-1$
                               this);
        if(isLoggedIn()) {
            SLF4JLoggerProxy.debug(this,
                                   "Already logged in!"); //$NON-NLS-1$
            return true;
        }
        if(doInitialize() &&
           (isSimulatedDataAllowed() ||
            doLogin(inCredentials))) {
            SLF4JLoggerProxy.debug(this,
                                   "Login successful, setting feed status to \"available\""); //$NON-NLS-1$
            setFeedStatus(FeedStatus.AVAILABLE);
            return true;
        }
        SLF4JLoggerProxy.debug(this,
                               "Login failed, setting feed status to \"offline\""); //$NON-NLS-1$
        setFeedStatus(FeedStatus.OFFLINE);
        return false;
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.MarketDataFeed#logout()
     */
    @Override
    public final void logout()
    {
        if(!isSimulatedDataAllowed()) {
            doLogout();
        }
        setFeedStatus(FeedStatus.OFFLINE);
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.IFeedComponent#getFeedStatus()
     */
    public final FeedStatus getFeedStatus()
    {
        return mFeedStatus;
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.IFeedComponent#getFeedType()
     */
    public final FeedType getFeedType()
    {
        return mFeedType;
    }
    /* (non-Javadoc)
     * @see org.springframework.context.Lifecycle#isRunning()
     */
    public boolean isRunning()
    {
        return getFeedStatus().isRunning();
    }
    /* (non-Javadoc)
     * @see org.springframework.context.Lifecycle#start()
     */
    public void start()
    {
        setFeedStatus(FeedStatus.UNKNOWN);
    }
    /* (non-Javadoc)
     * @see org.springframework.context.Lifecycle#stop()
     */
    public void stop()
    {
        // get all the tokens from active queries
        List<T> tokens = mHandleHolder.getTokens();
        for(T token : tokens) {
            // for each token, suspend the query.  this amounts to canceling the query
            //  on the datafeed side but keeping the token in the active list.  this
            //  accomplishes two goals:
            //   1) if the feed is truly stopping and not restarting, we clean up our
            //      connections with the datafeed
            //   2) if the feed is being restarted, we have a record of all the queries
            //      that were active before
            SLF4JLoggerProxy.debug(this, "Suspending {}", token); //$NON-NLS-1$
            token.setStatus(Status.SUSPENDED);
            // this performs the cancel on the datafeed side
            doHandleInvalidation(mHandleHolder.getHandles(token));
        }
        if(!isSimulatedDataAllowed()) {
            doLogout();
        }
        setFeedStatus(FeedStatus.OFFLINE);
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.IFeedComponent#getID()
     */
    public final String getID()
    {
        return mID.toString();
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.IFeedComponent#addFeedComponentListener(org.marketcetera.core.IFeedComponentListener)
     */
    public final void addFeedComponentListener(IFeedComponentListener inListener)
    {
        mFeedStatusPublisher.subscribe(new FeedComponentListenerWrapper(inListener,
                                                                        this));
    }
    /* (non-Javadoc)
     * @see org.marketcetera.marketdata.IFeedComponent#removeFeedComponentListener(org.marketcetera.core.IFeedComponentListener)
     */
    public final void removeFeedComponentListener(IFeedComponentListener inListener)
    {
        mFeedStatusPublisher.unsubscribe(new FeedComponentListenerWrapper(inListener,
                                                                          this));
    }
    /*
     * the following methods must be implemented by the feed subclasses and
     * represent the contract between subclass and parent
     */
    /**
     * Generates a token encapsulating the given request.
     * 
     * <p>The object returned is dedicated to the execution of the given message.
     * 
     * @param inTokenSpec a <code>MarketDataFeedTokenSpec</code> value encapsulating the data feed request
     * @return a <code>AbstractMarketDataFeedToken</code> value
     * @throws FeedException if an error occurs
     */
    protected abstract T generateToken(MarketDataFeedTokenSpec inTokenSpec)
        throws FeedException;
    /**
     * Executes the market data request represented by the passed value.
     * 
     * <p>The values returned in the handle list must be unique with respect
     * to the current JVM invocation for this data feed.
     *
     * @param inData a <code>D</code> value containing the data returned by
     *   the corresponding {@link DataRequestTranslator}.
     * @return a <code>List&lt;String&gt;</code> value containing the set of
     *   handles to be associated with this request
     * @throws FeedException if the request cannot be transmitted to the feed
     * @see DataRequestTranslator#fromDataRequest(MarketDataRequest)
     */
    protected abstract List<String> doMarketDataRequest(D inData)
        throws FeedException;
    /**
     * Returns an instance of {@link DataRequestTranslator} appropriate for this feed.
     *
     * <p>The {@link DataRequestTranslator} translates a <code>DataRequest</code> to a data-type
     * appropriate for this feed.
     * 
     * @return an <code>X</code> value
     */
    protected abstract X getMessageTranslator();
    /**
     * Determines if there exists an active and valid connection to the feed.
     * 
     * @return a <code>boolean</code> value
     */
    protected abstract boolean isLoggedIn();
    /**
     * Connects to the feed and supplies the given credentials.
     *
     * @param inCredentials a <code>C</code> value
     * @return a <code>boolean<code> value indicating whether the login
     *   was successful or not
     */
    protected abstract boolean doLogin(C inCredentials);
    /**
     * Disconnect from the feed.
     * @throws InterruptedException if the thread was interrupted during execution
    */
    protected abstract void doLogout();
    /**
     * Cancel the transaction associated with the given handle.
     *
     * @param inHandle a <code>String</code> value containing a handle
     *   meaningful to the feed
     */
    protected abstract void doCancel(String inHandle);
    /**
     * Returns an instance of {@link EventTranslator} appropriate for this feed.
     *
     * <p>The {@link EventTranslator} translates data-types appropriate for this feed
     * to subclasses of {@link Event}.
     * 
     * @return an <code>E</code> value
     */
    protected abstract E getEventTranslator();
    /*
     * the following methods *may* be overridden by implementing subclasses but
     * have default implementations as documented
     */
    /**
     * Performs any initialization steps necessary before {@link #doLogin(MarketDataFeedCredentials)}.
     *
     * <p>This implementation does nothing.
     * 
     * @return a <code>boolean</code> value indicating if the initialization was valid and
     *   processing may continue
     */
    protected boolean doInitialize()
    {
        return true;
    }
    /**
     * Called at the beginning of {@link #doExecute(AbstractMarketDataFeedToken)}. 
     *
     * <p>This implementation returns <code>true</code>.  Subclasses may override this
     * method to abort an execution if necessary.  Return <code>false</code> to abort
     * the execution.
     * 
     * @param inToken a <code>T</code> value
     * @return a <code>boolean</code> value
     */
    protected boolean beforeDoExecute(T inToken)
    {
        return true;
    }
    /**
     * Called at the end of {@link #doExecute(AbstractMarketDataFeedToken)}. 
     *
     * <p>This implementation does nothing.  Subclasses may override this
     * method to implement behavior required at the end of an execution.
     * This method will always be called, regardless of the success or failure
     * of the execution.
     * 
     * @param inToken a <code>T</code> value
     * @param inException a <code>Exception</code> value containing the exception thrown
     *   during execution or <code>null</code> if no exception was thrown
     */
    protected void afterDoExecute(T inToken, 
                                  Exception inException)
    {
    }
    /**
     * Gets the number of seconds to wait for an execution to be acknowledged.
     *
     * <p>This method returns 60.  Subclasses may override this method to return a different value.
     * The timeout applies to calls to {@link #execute(MarketDataFeedTokenSpec)}.  Note that the
     * feed is not obligated to return data within the time limit, merely to acknowledge log-in and
     * query submission.
     *  
     * @return a <code>long</code> value containing the number of seconds to wait before timing out
     */
    protected long getTimeout()
    {
        return 60;
    }
    /*
     * the following methods may be called by implementing subclasses but may not
     * be overridden
     */
    /**
     * Sets the status of the feed.
     * 
     * @param inFeedStatus a <code>FeedStatus</code> value
     * @throws NullPointerException if <code>inFeedStatus</code> is null
     */
    protected final void setFeedStatus(FeedStatus inFeedStatus)
    {
        SLF4JLoggerProxy.debug(this,
                               "Setting feed status to {}", //$NON-NLS-1$
                               inFeedStatus);
        if(inFeedStatus == null) {
            throw new NullPointerException();
        }
        synchronized(this) {
            // if the new status is not the same as the old one, act on it, otherwise ignore it to reduce noise
            if(!(mFeedStatus.equals(inFeedStatus))) {
                // if the old status was "off-line" and the new status is "available" then
                //  we need to do something to get the ball back rolling
                try {
                    if(!mFeedStatus.isRunning() &&
                       inFeedStatus.isRunning()) {
                        doReconnectToFeed();
                    }
                } finally {
                    // regardless of whether the above succeeds or fails, make sure
                    //  that the feed status is set appropriately and the subscribers
                    //  are notified
                    mFeedStatus = inFeedStatus;
                    mFeedStatusPublisher.publish(this);
                    notifyAll();
                }
            }
        }
    }
    /**
     * Registers data received from the feed in association with the
     * given handle.
     * 
     * <p>Subclasses should call this method to process data received
     * from the data feed.
     *
     * @param inHandle a <code>MarketDataHandle</code> value
     * @param inData an <code>Object</code> value containing the data
     *   to be transmitted to subscribers
     */
    protected final void dataReceived(String inHandle,
                                      Object inData)
    {
        ThreadedMetric.begin();
        try {
            MarketDataHandle mdHandle = compose(inHandle);
            T token = mHandleHolder.getToken(mdHandle);
            if(token == null) {
                Messages.WARNING_MARKET_DATA_FEED_DATA_IGNORED.debug(this,
                                                                     inData);
            } else {
                try {
                    E eventTranslator = getEventTranslator();
                    List<Event> events = eventTranslator.toEvent(inData,
                                                                 inHandle);
                    // events returned may contain aggregate events, which further need to be decomposed
                    // create a list of actual events
                    List<Event> actualEvents = new ArrayList<Event>();
                    // check the events returned to find aggregate events, if any
                    for(Event event : events) {
                        if(event instanceof AggregateEvent) {
                            AggregateEvent ae = (AggregateEvent)event;
                            actualEvents.addAll(ae.decompose());
                        } else {
                            actualEvents.add(event);
                        }
                    }
                    ThreadedMetric.event("mdata-translated");  //$NON-NLS-1$
                    // now publish the complete list of events in the proper order
                    for(Event event : actualEvents) {
                        event.setSource(token);
                        token.publish(event);
                    }
                } catch (Exception e) {
                    Messages.WARNING_MARKET_DATA_FEED_DATA_IGNORED.warn(this,
                                                                        e,
                                                                        inData);
                }
            }
        } finally {
            ThreadedMetric.end(PUBLISHING_CONDITION);
        }
    }
    /*
     * the following methods can be called by other package classes
     */
    /**
     * Cancels the active request represented by the given token.
     * 
     * <p>Every effort will be made to make sure each query associated with
     * the given token is canceled. 
     *
     * @param inToken a <code>T</code> value
     * @throws NullPointerException if <code>inToken</code> is null
     */
    final void cancel(T inToken)
    {
        if(inToken == null) {
            throw new NullPointerException();
        }
        try {
            // translate token to handle or handles
            List<MarketDataHandle> marketDataHandles = mHandleHolder.removeToken(inToken);
            if(marketDataHandles != null) {
                // pass handles to subclass to execute cancel
                for(MarketDataHandle marketDataHandle : marketDataHandles) {
                    try {
                        doCancel(decompose(marketDataHandle));
                    } catch (Exception e) {
                        Messages.WARNING_MARKET_DATA_FEED_CANNOT_CANCEL_SUBSCRIPTION.warn(this, e, marketDataHandle);
                    }
                }
            }
        } finally {
            inToken.setStatus(Status.CANCELED);
        }
    }
    /*
     * the following methods are helper methods for this class
     */
    /**
     * Performs the execution of the market data request.
     *
     * @param inToken a <code>T</code> value
     * @return a <code>boolean</code> value
     */
    private boolean doExecute(T inToken)
    {
        Exception thrownException = null;        
        try {
            if(!beforeDoExecute(inToken)) {
                return false;
            }
            // translate fix message to specialized type
            X xlator = getMessageTranslator();
            // the request is represented by a request stored on the token
            MarketDataRequest request = inToken.getTokenSpec().getDataRequest();
            // validate the request
            MarketDataRequest.validate(request);
            // translate the request to an appropriate proprietary format
            D data = xlator.fromDataRequest(request);
            if(request instanceof MarketDataRequest) {
                processResponse(doMarketDataRequest(data), 
                                inToken);
                return true;
            }
            // Unhandled message type
            Messages.ERROR_MARKET_DATA_FEED_UNKNOWN_MESSAGE_TYPE.error(this);
            return false;
        } catch (Exception e) {
            thrownException = e;
            Messages.ERROR_MARKET_DATA_FEED_EXECUTION_FAILED.error(this,
                                                                   e);
            Messages.ERROR_MARKET_DATA_FEED_EXECUTION_FAILED.error(org.marketcetera.core.Messages.USER_MSG_CATEGORY,
                                                                   e);
            return false;
        } finally {
            afterDoExecute(inToken, 
                           thrownException);
        }
    }
    /**
     * Processes the handles returned from a feed request.
     * 
     * <p>The given handles will be associated with the given token.  Later, when data is returned from the feed via {@link #dataReceived(String, Object)},
     * the handles stored here are used to associate the data with the token.
     *
     * @param inHandles a <code>List&lt;String&gt;</code> value containing the handles returned from the feed to associate with the given token
     * @param inToken a <code>T</code> value to which to associate the handles
     */
    private void processResponse(List<String> inHandles,
                                 T inToken)
    {
        if(inHandles == null ||
           inHandles.size() == 0) {
            Messages.WARNING_MARKET_DATA_FEED_NOT_RETURN_HANDLE.warn(this);
        } else {
            mHandleHolder.addHandles(inToken,
                                     inHandles);
        }
    }
    /**
     * Executes the steps necessary when the connection to the feed has been
     * resumed.
     */
    protected final void doReconnectToFeed()
    {
        SLF4JLoggerProxy.debug(this,
                               "Reconnecting to feed({}), resubmitting queries", //$NON-NLS-1$
                               isLoggedIn() ? "logged in" : "not logged in"); //$NON-NLS-1$ //$NON-NLS-2$
        // retrieve all the tokens for active queries
        Collection<T> tokens = mHandleHolder.getTokens();
        // these tokens need to be resubmitted to the feed
        // the token must still be valid because the various subscribers
        //  will still be listening.  the query represented by the token
        //  needs to be resubmitted to the feed and the resulting handles
        //  need to be added to the collection.
        for(T token : tokens) {
            // remove old record of this query
            doHandleInvalidation(mHandleHolder.removeToken(token));
            // set status to indicate it is being resubmitted
            token.setStatus(Status.RESUBMITTING);
            // newly submit query, allowing its new handles to be added
            SLF4JLoggerProxy.debug(this, "Resubmitting {}",token); //$NON-NLS-1$
            try {
                submitToken(token);
            } catch (TimeoutException e) {
                Messages.ERROR_TOKEN_RESUBMIT_FAILED.error(this, e, token);
                break;
            } catch (FeedException e) {
                Messages.ERROR_TOKEN_RESUBMIT_FAILED.warn(this, e, token);
            }
        }
    }
    /**
     * Executes the invalidation of the handles passed.
     * 
     * <p>The feed will be notified that each handle passed is invalid.
     * 
     * @param inHandles a <code>List&lt;MarketDataHandle&gt;</code> value
     */
    private void doHandleInvalidation(List<MarketDataHandle> inHandles)
    {
        for(MarketDataHandle handle : inHandles) {
            try {
                doCancel(handle.decompose());
            } catch (Exception e) {
                Messages.WARNING_MARKET_DATA_FEED_CANNOT_CANCEL_SUBSCRIPTION.warn(this, e, handle);
            }
        }
    }
    /**
     * Returns a {@link MarketDataHandle} instance encapsulating the given data feed proto-handle.
     *
     * @param inSeed a <code>String</code> value containing the proto-handle returned from
     *   a data feed
     * @return a <code>MarketDataHandle</code> value
     */
    private MarketDataHandle compose(String inSeed)
    {
        return new MarketDataHandle(inSeed);
    }
    /**
     * Returns the data feed proto-handle from the given {@link MarketDataHandle}.
     *
     * @param inHandle a <code>MarketDataHandle</code> value
     * @return a <code>String</code> value
     */
    private String decompose(MarketDataHandle inHandle)
    {
        return inHandle.decompose();
    }

    /**
     * Returns the provider name associated with this feed.
     *
     * @return a <code>String</code> value
     */
    protected final String getProviderName()
    {
        return mProviderName;
    }
    /**
     * Causes the given token to be executed.
     *
     * @param inToken a <code>T</code> value
     * @return a <code>T</code> value
     * @throws TimeoutException if the execute times out before returning 
     * @throws FeedException if an exception occurs
     */
    private T submitToken(T inToken) 
        throws FeedException, TimeoutException
    {
        // construct the object that will be invoked by the ThreadPool - this object is used
        //  to execute the query represented by the token
        ExecutorThread thread = new ExecutorThread(inToken);        
        try {
            // this command executes the request using the connector.  the connector has all the information it needs
            //  to execute the request because of the token.  this command is asynchronous.
            Future<T> response = sPool.submit(thread);
            // wait for the response to be returned for a max of 1 minute.  this doesn't mean that the results are back yet, just that
            //  the request has been received and acknowledged by the feed service.  if you can dig it, it's an
            //  asynchronous request for an asynchronous response.
            return response.get(getTimeout(),
                                TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            throw e;
        } catch (Exception e) {
            throw new FeedException(e,
                                    Messages.ERROR_MARKET_DATA_FEED_EXECUTION_FAILED);
        }
    }
    /*
     * the following are private helper classes
     */
    /**
     * Helper that performs the actual execution in the threadpool. 
     *
     * <p>This class exists for two reasons:
     * <ol>
     *   <li>Implement {@link Callable} which includes a public method for
     *       the benefit of the {@link ExecutorService} (threadpool) but
     *       without requiring {@link AbstractMarketDataFeed}
     *       to implement {@link Callable#call()}, which should not be
     *       called by external classes.</li>
     *   <li>Make sure that a specific pair of {@link T} and {@link C}
     *       corresponds to the {@link Callable#call()} method that uses
     *       them</li>
     * </ol>
     * 
     * <p>Note that this class is intentionally declared <code>non-static</code>
     * because it must have access to the parent's <code>T</code> and <code>C</code>
     * types and must call several non-static methods on the parent.
     *
     * @author <a href="mailto:colin@marketcetera.com">Colin DuPlantis</a>
     * @version $Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $
     * @since 0.5.0
     */
    @ClassVersion("$Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $") //$NON-NLS-1$
    private class ExecutorThread
        implements Callable<T>
    {
        /**
         * the token representing the request and its responses
         */
        private final T mToken;
        /**
         * Create a new ExecutorThread instance.
         *
         * @param inToken a <code>T</code> value
         */
        private ExecutorThread(T inToken)
        {
            mToken = inToken;
        }       
        /* (non-Javadoc)
         * @see java.util.concurrent.Callable#call()
         */
        public T call()
                throws Exception
        {
            T token = mToken;

            token.setStatus(MarketDataFeedToken.Status.RUNNING);
            if(!isLoggedIn()) {
                // bail out expressing sadness
                token.setStatus(MarketDataFeedToken.Status.LOGIN_FAILED);
                return token;
            }
            // feed is logged in
            // do any initialization required
            boolean succeeded = false;
            try {
                succeeded = doInitialize();
            } catch (Exception e) {
                Messages.WARNING_MARKET_DATA_FEED_CANNOT_INITIALIZE.warn(AbstractMarketDataFeed.DATAFEED_STATUS_MESSAGES, 
                                                                         e);
                succeeded = false;
            }
            if(!succeeded) {
                token.setStatus(MarketDataFeedToken.Status.INITIALIZATION_FAILED);
                return token;
            }
            // feed should be ready for commands
            // execute command, wait for status response, not responses to the actual command
            succeeded = false;
            try {
                succeeded = doExecute(token);
            } catch (Exception e) {
                Messages.WARNING_MARKET_DATA_FEED_CANNOT_EXEC_COMMAND.warn(AbstractMarketDataFeed.DATAFEED_STATUS_MESSAGES,
                                                                           e);
                succeeded = false;
            }
            if(!succeeded) {
                token.setStatus(MarketDataFeedToken.Status.EXECUTION_FAILED);
                return token;
            }
            token.setStatus(MarketDataFeedToken.Status.ACTIVE);
            return token;
        }        
    }
    /**
     * Encapsulates the handles and tokens collections.
     *
     * <p>Note that this class is declared non-static intentionally in order
     * to use the parent class's generic types.
     *
     * @author <a href="mailto:colin@marketcetera.com">Colin DuPlantis</a>
     * @version $Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $
     * @since 0.5.0
     */    
    @ClassVersion("$Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $") //$NON-NLS-1$
    private class HandleHolder
    {
        /**
         * stores client response handles by token - note that these two collections must be kept in sync
         */
        private final Map<T,List<MarketDataHandle>> mHandlesByToken = new HashMap<T,List<MarketDataHandle>>();
        /**
         * stores tokens by handle - note that these two collections must be kept in sync
         */
        private final Map<MarketDataHandle,T> mTokensByHandle = new HashMap<MarketDataHandle,T>();
        /**
         * object used for synchronization lock for these two collections
         */
        private final Object mLock = new Object();
        /**
         * Create a new HandleHolder instance.
         */
        private HandleHolder()
        {            
        }
        /**
         * Records the given handles as associated to the given token.
         *
         * @param inToken a <code>T</code> value
         * @param inHandles a <code>List&lt;String&gt;</code> value
         */
        private void addHandles(T inToken,
                                List<String> inHandles)
        {
            // by convention, synchronization for mHandlesByToken and mTokensByHandle
            //  is performed on mLock to avoid deadlock - do not access
            //  mTokensByHandle or mHandlesByToken without synchronizing mLock
            synchronized(mLock) {
                List<MarketDataHandle> marketDataHandles = mHandlesByToken.get(inToken);
                if(marketDataHandles == null){
                    marketDataHandles = new ArrayList<MarketDataHandle>();
                }
                for(String handle : inHandles) {
                    MarketDataHandle mdHandle = compose(handle);
                    marketDataHandles.add(mdHandle);
                    mTokensByHandle.put(mdHandle, 
                                        inToken);
                }
                mHandlesByToken.put(inToken, 
                                    marketDataHandles);
            }
        }
        /**
         * Returns the token associated with the given handle.
         *
         * @param inMarketDataHandle a <code>MarketDataHandle</code> value
         * @return a <code>T</code> value or null if no token is associated
         *   with the given handle
         */
        private T getToken(MarketDataHandle inMarketDataHandle)
        {
            // by convention, synchronization for mHandlesByToken and mTokensByHandle
            //  is performed on mLock to avoid deadlock - do not access
            //  mTokensByHandle or mHandlesByToken without synchronizing mLock
            synchronized(mLock) {
                return mTokensByHandle.get(inMarketDataHandle);
            }
        }
        /**
         * Removes the given token and its handles from the handle list.
         *
         * @param inToken a <code>T</code> value
         * @return a <code>List&lt;MarketDataHandle&gt;</code> value containing the handles
         *   associated with the token
         */
        private List<MarketDataHandle> removeToken(T inToken)
        {
            List<MarketDataHandle> handles;
            // by convention, synchronization for mHandlesByToken and mTokensByHandle
            //  is performed on mLock to avoid deadlock - do not access
            //  mTokensByHandle or mHandlesByToken without synchronizing mLock
            synchronized(mLock) {
                handles = mHandlesByToken.remove(inToken);
                if(handles == null) {
                    return new ArrayList<MarketDataHandle>();
                }
                for(MarketDataHandle handle : handles) {
                    mTokensByHandle.remove(handle);
                }
            }
            return handles;
        }
        /**
         * Gets the handles associated with the given token.
         *
         * @param inToken a <code>T</code> value
         * @return a <code>List&lt;MarketDataHandle&gt;</code> value
         */
        private List<MarketDataHandle> getHandles(T inToken)
        {            
            synchronized(mLock) {
                List<MarketDataHandle> handles = mHandlesByToken.get(inToken);
                if(handles == null) {
                    handles = new ArrayList<MarketDataHandle>();
                }
                return handles;
            }
        }
        /**
         * Returns all the tokens in no particular order.
         * 
         * @return a <code>Collection&lt;T&gt;</code> value
         */
        private List<T> getTokens()
        {
            synchronized(mLock) {
                return new ArrayList<T>(mTokensByHandle.values());
            }
        }
   }
    /**
     * A unique handle to associate data feed requests with responses.
     *
     * <p>The handle created is guaranteed to be unique within the scope of all
     * data feeds in the current JVM run iff:
     * <ol>
     *   <li>all proto-handles returned by {@link AbstractMarketDataFeed#doMarketDataRequest(Object)} 
     *       are unique within the scope of the relevant feed in the current JVM run</li>
     *   <li>the set of values returned by {@link IMarketDataFeedFactory#getProviderName()} from all
     *       data feeds contains no duplicates</li>
     * </ol>
     *       
     * @author <a href="mailto:colin@marketcetera.com">Colin DuPlantis</a>
     * @version $Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $
     * @since 0.5.0
     */
    @ClassVersion("$Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $") //$NON-NLS-1$
    private class MarketDataHandle
    {
        /**
         * the handle value
         */
        private final String mHandle;
        /**
         * the handle from the data feed itself
         */
        private final String mProtoHandle;
        /**
         * Create a new MarketDataHandle instance.
         *
         * @param inHandle a <code>String</code> value containing a value
         *   meaningful to the originating data feed which can be used
         *   to refer to a unique data feed request
         * @throws NullPointerException if <code>inHandle</code> is null
         */
        private MarketDataHandle(String inHandle)
        {
            if(inHandle == null) {
                throw new NullPointerException();
            }
            mProtoHandle = inHandle;
            mHandle = String.format("%s-%s", //$NON-NLS-1$
                                    getProviderName(),
                                    inHandle);
        }
        /**
         * Returns the proto-handle used to originally create this object.
         *
         * @return a <code>String</code> value
         */
        private String decompose()
        {
            return new String(mProtoHandle);
        }
        /* (non-Javadoc)
         * @see java.lang.Object#toString()
         */
        public String toString()
        {
            return new String(mHandle);
        }
        /* (non-Javadoc)
         * @see java.lang.Object#hashCode()
         */
        public int hashCode()
        {
            final int PRIME = 31;
            int result = 1;
            result = PRIME * result + ((mHandle == null) ? 0 : mHandle.hashCode());
            return result;
        }
        /* (non-Javadoc)
         * @see java.lang.Object#equals(java.lang.Object)
         */
        public boolean equals(Object obj)
        {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            final MarketDataHandle other = (MarketDataHandle) obj;
            if (mHandle == null) {
                if (other.mHandle != null)
                    return false;
            } else if (!mHandle.equals(other.mHandle))
                return false;
            return true;
        }
    }
    /**
     * Wraps the {@link IFeedComponentListener} for this feed.
     *
     * <p>The wrapper translates {@link ISubscriber} methods to
     * the {@link IFeedComponentListener} objects.
     * 
     * @author <a href="mailto:colin@marketcetera.com">Colin DuPlantis</a>
     * @version $Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $
     * @since 0.5.0
     */
    @ClassVersion("$Id: AbstractMarketDataFeed.java 10833 2009-10-26 21:39:40Z colin $") //$NON-NLS-1$
    private static class FeedComponentListenerWrapper
        implements ISubscriber
    {
        /**
         * the feed component listener to which to transmit feed status updates
         */
        private final IFeedComponentListener mListener;
        /**
         * the feed component (data feed) whose status is changing
         */
        private final IFeedComponent mParent;
        /**
         * Create a new FeedComponentListenerWrapper instance.
         *
         * @param inListener an <code>IFeedComponentListener</code> value
         * @param inParent an <code>IFeedComponent</code> value
         * @throws NullPointerException if either the listener or parent are null
         */
        private FeedComponentListenerWrapper(IFeedComponentListener inListener,
                                             IFeedComponent inParent)
        {
            if(inListener == null ||
               inParent == null) {
                throw new NullPointerException();
            }
            mListener = inListener;
            mParent = inParent;
        }
        /* (non-Javadoc)
         * @see org.marketcetera.core.publisher.ISubscriber#isInteresting(java.lang.Object)
         */
        public boolean isInteresting(Object inData)
        {
            return true;
        }
        /* (non-Javadoc)
         * @see org.marketcetera.core.publisher.ISubscriber#publishTo(java.lang.Object)
         */
        public void publishTo(Object inData)
        {
            mListener.feedComponentChanged(mParent);
        }
        /* (non-Javadoc)
         * @see java.lang.Object#hashCode()
         */
        public int hashCode()
        {
            final int PRIME = 31;
            int result = 1;
            result = PRIME * result + ((mListener == null) ? 0 : mListener.hashCode());
            return result;
        }
        /* (non-Javadoc)
         * @see java.lang.Object#equals(java.lang.Object)
         */
        public boolean equals(Object obj)
        {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            final FeedComponentListenerWrapper other = (FeedComponentListenerWrapper) obj;
            if (mListener == null) {
                if (other.mListener != null)
                    return false;
            } else if (!mListener.equals(other.mListener))
                return false;
            return true;
        }
    }
}
